package scales.utils.iteratee
import scalaz.{IterV, Enumerator, Input, EphemeralStream}
import scalaz.IterV._
import scales.utils.ScalesUtils
import scales.utils.io
trait Eval[WHAT,RETURN] {
val orig : IterV[WHAT, RETURN]
def eval : IterV[WHAT, RETURN] = {
orig.fold(done = (x, y) => Done(x,y),
cont = k => k(EOF[WHAT]))
}
}
trait IterateeImplicits {
implicit def toEval[WHAT, RETURN]( i : IterV[WHAT, RETURN] ) = new Eval[WHAT, RETURN] {
lazy val orig = i
}
implicit val iteratorEnumerator = new Enumerator[Iterator] {
@annotation.tailrec
def apply[E,A](iter: Iterator[E], i: IterV[E,A]): IterV[E,A] = i match {
case _ if iter.isEmpty => i
case Done(acc, input) => i
case Cont(k) =>
val x : E = iter.next
apply(iter, k(El(x)))
}
}
}
trait Iteratees {
def dropWhile[E](f: (E) => Boolean) : IterV[E, Option[E]] = {
def step(s: Input[E]): IterV[E, Option[E]] =
s(el = e => {
if (f(e))
Cont(step)
else
Done(Some(e), IterV.Empty[E])
},
empty = Cont(step),
eof = Done(None, EOF[E]))
Cont(step)
}
def find[E](f: (E) => Boolean) : IterV[E, Option[E]] =
dropWhile(!f(_))
def filter[E](f: (E) => Boolean) : IterV[E, Iterable[E]] = {
def step(l : List[E])(s: Input[E]): IterV[E, Iterable[E]] =
s(el = e => {
if (f(e))
Cont(step(l :+ e))
else
Cont(step(l))
},
empty = Cont(step(l)),
eof = Done(l, EOF[E]))
Cont(step(List()))
}
type ResumableIterList[E,A] = IterV[E, (Iterable[A],IterV[E,_])]
type ResumableIter[E,A] = IterV[E, (A, IterV[E,_])]
def [,]( : ResumableIter[E, A] ) =
iter.fold[ResumableIter[E,A]](
done = (, ) =>
x._2.asInstanceOf[ResumableIter[E,A]],
cont = => error("Was not done") )
def [,]( : ResumableIter[E, A] ) =
iter.fold[Option[A]](
done = (, ) =>
Some(x._1),
cont = => None )
def isDone[E,A]( iter : IterV[E, A] ) =
iter.fold[Boolean](
done = (a, i) => true,
cont = f => false )
def isEmpty[E,A]( iter : IterV[E,A] ) =
iter.fold[Boolean](
done = (a, i) => IterV.Empty.unapply[E](i),
cont = f => error("Iteratee is not Done") )
def isEOF[E,A]( iter : IterV[E,A] ) =
iter.fold[Boolean](
done = (a, i) => EOF.unapply[E](i),
cont = f => error("Iteratee is not Done") )
implicit def toResumableIter[E,A]( oiter : IterV[E,A]) : ResumableIter[E,A] = {
def step(iter : IterV[E,A])( s : Input[E]) : ResumableIter[E, A] = {
val next = iter match {
case Done(x, y) => Done((x, Cont(step(oiter))),y)
case Cont(k) =>
{
k(s) match {
case i@Done(x, y) => Done((x, Cont(step(oiter))),y)
case i@Cont(_) => Cont(step(i))
}
}
}
next.asInstanceOf[ResumableIter[E,A]]
}
Cont(step(oiter))
}
def foldI[E,A]( f : (E,A) => A )( init : A ) : ResumableIter[E,A] = {
def step( current : A )( s : Input[E] ) : ResumableIter[E,A] =
s(el = {e =>
val next = f(e,current)
Done((next, Cont(step(next))), IterV.Empty[E])},
empty = Cont(step(current)),
eof = Done((current, Cont(step(init))),IterV.EOF[E]))
Cont(step(init))
}
def foldOnDone[E,A, ACC, F[_]]( it : F[E] )( initAcc : ACC, initResumable : ResumableIter[E,A] )( f : (ACC, A) => ACC )(implicit e : Enumerator[F] ) : ACC = {
import ScalesUtils._
var currentI = initResumable(it).eval
var isdone = isDone(currentI)
var currentA = initAcc
while( !isdone || (isdone && !isEOF(currentI)) ) {
if (isdone) {
val a = extract(currentI)
if (!a.isDefined)
return currentA
else {
currentA = f(currentA, a.get)
currentI = extractCont(currentI)
}
}
currentI = currentI(it).eval
isdone = isDone(currentI)
}
currentA
}
class ResumableIterIterator[E,A,F[_]]( it : F[E])(init : ResumableIter[E,A])(implicit e : Enumerator[F]) extends Iterator[A] {
import ScalesUtils._
var cur = init(it).eval
var isdone = isDone(cur)
var r = extract(cur)
def next = {
val t = r
cur = extractCont(cur)(it).eval
isdone = isDone(cur)
r = extract(cur)
t.get
}
def hasNext = isdone && !isEOF(cur) && r.isDefined
}
def withIter[E,A,F[_]]( it : F[E])(initResumable : ResumableIter[E,A])(implicit e : Enumerator[F]) = new ResumableIterIterator[E,A,F](it)(initResumable)
def onDone[E,A](originalList : List[ResumableIter[E,A]]) : ResumableIterList[E,A] = {
def step(l : List[ResumableIter[E,A]])(s: Input[E]): ResumableIterList[E,A] =
s(el = e => {
var res : List[A] = Nil
var newl : List[ ResumableIter[E,A] ] = Nil
@inline def add( k : (scalaz.Input[E]) => scalaz.IterV[E,(A, scalaz.IterV[E, _])]) {
val d = k(El(e))
newl = d :: newl
d match {
case Done(x, _) => res = x._1 :: res
case _ => ()
}
}
val i = l.iterator
while(i.hasNext) {
i.next match {
case iter@Done(e1, _) =>
if (isEOF(iter)) Nil
else {
if (isEmpty(iter))
e1._2 match {
case i @ Cont(k) => add(k.asInstanceOf[(scalaz.Input[E]) => scalaz.IterV[E,(A, scalaz.IterV[E, _])]])
case _ => error("Continuation can only be a Cont")
}
else error("Can only handle EOF or Empty for Done")
}
case iter@Cont(k) => add(k)
}
}
if (res.isEmpty)
Cont(step(newl))
else
Done((res, Cont(step(newl))), IterV.Empty[E])
},
empty = Cont(step(l)),
eof = Done((Nil, Cont(step(l))), EOF[E]))
Cont(step(originalList))
}
def runningCount[E] = foldI[E,Long]((e :E, count : Long) => count + 1 )(0)
def appendTo( to : Appendable ) : IterV[CharSequence, CharSequence] = {
def step(s: Input[CharSequence]): IterV[CharSequence, CharSequence] =
s(el = {e =>
to.append(e)
Done(e, IterV.Empty[CharSequence])},
empty = Cont(step),
eof = Done("",IterV.EOF[CharSequence]))
Cont(step)
}
def evalWith[FROM,TO]( f : (FROM) => TO ) : IterV[FROM, TO] = {
def step(s: Input[FROM]): IterV[FROM, TO] =
s(el = e => {
val to = f(e)
Done(to, IterV.Empty[FROM])
},
empty = Cont(step),
eof = Cont(step))
Cont(step)
}
def enumerateeMap[E, A, R]( dest : IterV[A,R])( f : E => A ) : IterV[E, R] = {
def next( i : IterV[A,R] ) : IterV[E, R] =
i.fold(
done = (a, y) => Done(a, IterV.EOF[E]),
cont = k => Cont((x: Input[E]) =>
x( el = e => next(k(IterV.El(f(e)))),
empty = next(k(IterV.Empty[A])),
eof = next(k(IterV.EOF[A])))
)
)
next(dest)
}
def sum[T](implicit n: Numeric[T]): IterV[T,T] = {
import n._
def step(acc: T)( s : Input[T] ) : IterV[T, T] =
s( el = e => Cont(step(acc + e)),
empty = Cont(step(acc)),
eof = Done(acc, IterV.EOF[T])
)
Cont(step(zero))
}
def mapTo[E, A]( f: E => Input[EphemeralStream[A]] ): IterV[E, EphemeralStream[A]] = {
def step(s: Input[E]): IterV[E, EphemeralStream[A]] =
s( el = e => {
val r = f(e)
r( el = e1 => {
Done(e1, IterV.Empty[E])
},
empty = Cont(step),
eof = Done(EphemeralStream.empty, IterV.EOF[E])
)
},
empty = Cont(step),
eof = Done(EphemeralStream.empty, IterV.EOF[E])
)
Cont(step)
}
def foldOnDoneIter[E,A, ACC]( initAcc : ACC, initIter : ResumableIter[E,A])( f : (ACC, A) => ACC ) : IterV[E, ACC] = {
def next( acc : ACC, i : ResumableIter[E,A]) : IterV[E, ACC] =
i.fold(
done = (ac, y) => {
val (e, cont) = ac
val newacc = f(acc, e)
y match {
case IterV.El(a) =>
error("Cannot process an input element from Done")
case IterV.Empty() =>
Cont( (x : Input[E]) =>
next(newacc,
cont match {
case Done(x,y) => error("got a Done from a resumableIter cont "+ x +" "+y)
case Cont(k) =>
k(x).asInstanceOf[ResumableIter[E,A]]
}
))
case IterV.EOF() => Done(newacc, IterV.EOF[E])
}
},
cont = k => Cont((x: Input[E]) =>
next(acc, k(x)))
)
next(initAcc, initIter)
}
def enumToMany[E, A, R]( dest: ResumableIter[A,R])( toMany: ResumableIter[E, EphemeralStream[A]]): ResumableIter[E, R] = {
val empty = () => EphemeralStream.empty
def loop( i: ResumableIter[A,R], s: () => EphemeralStream[A] ):
(ResumableIter[A,R], () => EphemeralStream[A]) = {
var c: ResumableIter[A,R] = i
var cs: EphemeralStream[A] = s()
while(!isDone(c) && !cs.isEmpty) {
val (nc, ncs): (ResumableIter[A,R], EphemeralStream[A]) = c.fold(
done = (a, y) => (c, s()),
cont =
k => {
val head = cs.head()
(k(IterV.El(head)), cs.tail())
}
)
c = nc
cs = ncs
}
(c, () => cs)
}
def pumpNext(x: Input[E], toMany: scalaz.Input[E] => ResumableIter[E,EphemeralStream[A]], k: scalaz.Input[A] => ResumableIter[A,R] ): ResumableIter[E, R] = {
val afterNewCall = toMany(x)
afterNewCall.fold(
done = (nextContPair, rest) => {
val (e1, nextCont) = nextContPair
val nextContR = nextCont.asInstanceOf[ResumableIter[E,scalaz.EphemeralStream[A]]]
if (isEOF(afterNewCall)) {
next(k(IterV.EOF[A]), empty, nextContR)
} else {
if (e1.isEmpty) {
next(k(IterV.Empty[A]), empty, nextContR)
}
else {
val h = e1.head()
next(k(IterV.El(h)), e1.tail, nextContR)
}
}
},
cont = k1 => {
next(k(IterV.Empty[A]), empty, afterNewCall)
}
)
}
def contk( k: scalaz.Input[A] => ResumableIter[A,R], i: ResumableIter[A,R], s: () => EphemeralStream[A], toMany: ResumableIter[E, EphemeralStream[A]] ): ResumableIter[E, R] = {
if (!s().isEmpty) {
val (ni, ns) = loop(i, s)
next(ni, ns, toMany)
} else
Cont((x: Input[E]) =>
x( el = e => {
toMany.fold (
done = (a, y) => {
val (e1, nextContR) = a
val nextCont = nextContR.asInstanceOf[ResumableIter[E,scalaz.EphemeralStream[A]]]
error("Unexpected State for enumToMany - Cont but toMany is done")
},
cont = toManyCont => {
pumpNext(x, toManyCont, k)
}
)
},
empty = {
next(k(IterV.Empty[A]), empty, toMany)
},
eof = {
next(k(IterV.EOF[A]), empty, toMany)
}
))
}
def doneWith(a: (R, ResumableIter[A,R]), y: Input[A], i: ResumableIter[A,R], s: () => EphemeralStream[A], toMany: ResumableIter[E, EphemeralStream[A]], internalEOF: Boolean ): ResumableIter[E, R] = {
val (res, nextCont) = a
val returnThis : ResumableIter[E, R] =
if ((isDone(nextCont) && isEOF(nextCont)) ||
(isDone(toMany) && isEOF(toMany)) ||
(EOF.unapply(y) && !internalEOF )
) {
Done((res, Done(res, IterV.EOF[E])), IterV.EOF[E])
} else {
Done((res,
{
val cont = () => next(nextCont.asInstanceOf[ResumableIter[A,R]], s, toMany, true)
val n = Cont( (i: Input[E]) => {
cont()
})
if (s().isEmpty) {
n
} else {
cont()
}
}), IterV.Empty[E])
}
if (EOF.unapply(y) && !internalEOF) {
toMany.fold(done= (a1, y1) => false,
cont = k => {
k(IterV.EOF[E]); false
})
}
returnThis
}
def next( i: ResumableIter[A,R], s: () => EphemeralStream[A], toMany: ResumableIter[E, EphemeralStream[A]], internalEOF: Boolean = false ): ResumableIter[E, R] =
i.fold(
done = (a, y) => doneWith(
a.asInstanceOf[(R, ResumableIter[A, R])], y, i, s, toMany, internalEOF),
cont =
k => {
contk(k, i, s, toMany)
}
)
next(dest, empty, toMany)
}
}